
-- 该插件以apisix内置的http-logger插件为基础开发，很多运行逻辑是按http-logger的运行逻辑走的

local batch_processor = require("apisix.utils.batch-processor")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
local plugin = require("apisix.plugin")

local ngx = ngx
local tostring = tostring
local pairs = pairs
local ipairs = ipairs
local str_byte = string.byte
local timer_at = ngx.timer.at

local plugin_name = "postgresql-logger"
local stale_timer_running = false
local buffers = {}
local lru_log_format = core.lrucache.new({
    ttl = 300, count = 512
})
--导入依赖
local luasql = require "luasql.postgres"

local schema = {
    type = "object",
    properties = {
        host = { type = "string" }, -- postgresql服务器ip
        port = { type = "string" }, -- postgresql服务端口
        dbname = { type = "string" }, -- postgresql日志表所在数据库
        username = { type = "string" }, -- postgresql用户名
        password = { type = "string" }, -- postgresql密码
        tablename = { type = "string" }, -- postgresql日志表
        uri = core.schema.uri_def,
        auth_header = { type = "string", default = "" },
        timeout = { type = "integer", minimum = 1, default = 3 },
        name = { type = "string", default = "postgresql-logger" },-- batch_processor的参数
        max_retry_count = { type = "integer", minimum = 0, default = 0 },-- batch_processor的参数
        retry_delay = { type = "integer", minimum = 0, default = 1 },-- batch_processor的参数
        buffer_duration = { type = "integer", minimum = 1, default = 60 },-- batch_processor的参数
        inactive_timeout = { type = "integer", minimum = 1, default = 5 },-- batch_processor的参数
        batch_max_size = { type = "integer", minimum = 1, default = 1000 },-- batch_processor的参数
        include_req_body = { type = "boolean", default = false },
        concat_method = { type = "string", default = "json",
                          enum = { "json", "new_line" } }
    },
    required = { "host", "port", "dbname", "username", "password","tablename" }
    --apisix插件必须配置的参数，在apisix开启时会检查每个插件的配置是否有误，插件有误则该转发会失效
}

-- 日志格式元数据
local metadata_schema = {
    type = "object",
    properties = {
        log_format = {
            type = "object",
            default = {
                ["host"] = "$host",
                ["@timestamp"] = "$time_iso8601",
                ["client_ip"] = "$remote_addr",
                ["route_id"] = "$route_id",
                ["route_name"] = "$route_id",
                ["service_id"] = "$route_id",
                ["service_name"] = "$route_id",
                ["client_ip"] = "$remote_addr",
            },
        },
    },
    additionalProperties = false,
}

-- 插件元数据
local _M = {
    version = 0.1,
    priority = 410,
    name = plugin_name,
    schema = schema,
    metadata_schema = metadata_schema,
}

--conf就是我们进行插件配置的信息，这些信息会和schema对比，验证配置的格式合法性和是否缺失等等
function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end

--发送日志，conf是插件配置
local function send_postgresql_data(conf, log_message)
    local err_msg  -- 错误信息
    local res = true -- 发送是否成功

    -- 获取环境
    local env = luasql.postgres()

    if (nil == env) then
        res = false;
        err_msg = "luasql-postgres environment failure";
        return res ,err_msg;
    end

    -- port是string类型，转为number类型，tonumber(conf.port)
    local conn = env:connect(conf.dbname, conf.username, conf.password, conf.host, tonumber(conf.port));

    if (nil == conn) then
        res = false;
        err_msg = "connect postgresql server failure";
        env:close();
        return res ,err_msg;
    end

    local resultLine = conn:execute("INSERT INTO "..conf.tablename.." (log) VALUES ('"..log_message.."');")

    -- 插入行数不为1，则插入失败
    if (1.0 ~= resultLine) then
        res = false;
        err_msg = "insert into "..conf.tablename.." failure in "..conf.dbname.." of database";
        conn:close();
        env:close();
        return res ,err_msg;
    end

    -- 资源关闭
    conn:close();
    env:close();
    return res, err_msg
end

--生成日志格式，metadata是插件元数据配置
local function gen_log_format(metadata)
    local log_format = {}
    --如果我们没有设置格式，，apisix也没有默认则为空则
    if metadata == nil then
        return log_format
    end

    for k, var_name in pairs(metadata.value.log_format) do
        if var_name:byte(1, 1) == str_byte("$") then
            -- $开头则获取环境变量
            log_format[k] = { true, var_name:sub(2) }
        else
            log_format[k] = { false, var_name }
        end
    end
    --向apisix报告日志格式更新的行为
    core.log.info("log_format: ", core.json.delay_encode(log_format))
    return log_format
end


-- 移除日志缓存，缓存本质上是个table，保证内存不会占用过多
local function remove_stale_objects(premature)
    if premature then
        return
    end

    for key, batch in ipairs(buffers) do
        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
            core.log.warn("removing batch processor stale object, conf: ",
                    core.json.delay_encode(key))
            buffers[key] = nil -- 置为nil，清空数据
        end
    end

    stale_timer_running = false -- 重启移除缓存标识
end

--apisix最终去执行的是这个方法去记录日志
function _M.log(conf, ctx)
    --获取插件元数据
    local metadata = plugin.plugin_metadata(plugin_name)
    --apisix记录运行日志
    core.log.info("metadata: ", core.json.delay_encode(metadata))

    local entry
    -- 获取日志格式
    local log_format = lru_log_format(metadata or "", nil, gen_log_format,
            metadata)
    -- 用该日志格式，获取日志内容
    if core.table.nkeys(log_format) > 0 then
        entry = core.table.new(0, core.table.nkeys(log_format))
        for k, var_attr in pairs(log_format) do
            if var_attr[1] then
                entry[k] = ctx.var[var_attr[2]]
            else
                entry[k] = var_attr[2]
            end
        end

        local matched_route = ctx.matched_route and ctx.matched_route.value
        if matched_route then
            entry.service_id = matched_route.service_id
            entry.route_id = matched_route.id
        end
    else
        entry = log_util.get_full_log(ngx, conf)
    end

    -- 无路由匹配的日志
    if not entry.route_id then
        entry.route_id = "no-matched"
    end

    -- 是否清空日志缓存？
    if not stale_timer_running then
        -- 有日志在缓存里呆了超过30分钟，直接移除
        timer_at(1800, remove_stale_objects)
        stale_timer_running = true -- 开启移除日子缓存标识
    end

    local log_buffer = buffers[conf]

    if log_buffer then
        log_buffer:push(entry)
        return
    end

    -- 批处理器的执行方法
    local func = function(entries, batch_max_size)
        local data, err

        if conf.concat_method == "json" then
            if batch_max_size == 1 then
                -- 如果是json格式并且缓存最大值为1，解析数据
                data, err = core.json.encode(entries[1]) -- 解析成json串 {}
            else
                -- 缓存最大值大于1，那么解析所有日志数据为json格式的数组
                data, err = core.json.encode(entries)
            end

        elseif conf.concat_method == "new_line" then
            if batch_max_size == 1 then
                data, err = core.json.encode(entries[1]) -- 缓存最大值为1，解析为单条普通字符串格式
            else
                local t = core.table.new(#entries, 0)
                for i, entry in ipairs(entries) do
                    t[i], err = core.json.encode(entry)
                    if err then
                        core.log.warn("failed to encode http log: ", err, ", log data: ", entry)
                        break
                    end
                end
                data = core.table.concat(t, "\n") -- 缓存最大值大于1，解析为多条普通字符串格式，多条间换行符隔开
            end

        else
            -- 日志内容的格式只支持json和new_line，设成其他格式则报错
            err = "unknown concat_method " .. (conf.concat_method or "nil")
        end

        --数据解析失败
        if not data then
            return false, 'error occurred while encoding the data: ' .. err
        end
        -- 解析完准备发送
        return send_postgresql_data(conf, data)
    end

    --读取一下我们配置插件信息里面对于批处理器的配置
    local config = {
        name = conf.name,
        retry_delay = conf.retry_delay,
        batch_max_size = conf.batch_max_size,
        max_retry_count = conf.max_retry_count,
        buffer_duration = conf.buffer_duration,
        inactive_timeout = conf.inactive_timeout,
        route_id = ctx.var.route_id,
        server_addr = ctx.var.server_addr,
    }

    local err
    log_buffer, err = batch_processor:new(func, config) -- 让batch_processor去执行log的发送


    if not log_buffer then
        core.log.error("error when creating the batch processor: ", err)
        return
    end

    buffers[conf] = log_buffer
    log_buffer:push(entry)
end

return _M
